DB_URI
指向数据库
In [8]:
import os
import numpy as np
import pandas as pd
from cvxpy import *
from cvxopt import *
from alphamind.api import *
from alphamind.cython.optimizers import QPOptimizer
In [9]:
risk_penlty = 0.5
ref_date = '2018-02-08'
engine = SqlEngine(os.environ['DB_URI'])
universe = Universe('ashare_ex')
codes = engine.fetch_codes(ref_date, universe)
risk_cov, risk_exposure = engine.fetch_risk_model(ref_date, codes)
factor = engine.fetch_factor(ref_date, 'EPS', codes)
total_data = pd.merge(factor, risk_exposure, on='code').dropna()
all_styles = risk_styles + industry_styles + macro_styles
risk_exposure_values = total_data[all_styles].values.astype(float)
special_risk_values = total_data['srisk'].values.astype(float)
risk_cov_values = risk_cov[all_styles].values
sec_cov_values_full = risk_exposure_values @ risk_cov_values @ risk_exposure_values.T / 10000 + np.diag(special_risk_values ** 2) / 10000
signal_full = total_data['EPS'].values
In [10]:
n = 200
sec_cov_values = sec_cov_values_full[:n, :n]
signal = signal_full[:n]
In [11]:
%%time
w = Variable(n)
lbound = 0.
ubound = 1. / n * 20
risk = sum_squares(multiply(special_risk_values[:n] / 100., w)) + quad_form((w.T * risk_exposure_values[:n]).T, risk_cov_values[:n, :n] / 10000.)
objective = Minimize(risk_penlty * risk - signal * w)
constraints = [w >= lbound,
w <= ubound,
sum(w) == 1,]
prob = Problem(objective, constraints)
In [12]:
%%time
prob.solve(verbose=True)
Out[12]:
In [13]:
prob.status, prob.value
Out[13]:
In [14]:
%%time
prob.solve(verbose=True, solver='ECOS')
Out[14]:
In [15]:
prob.status, prob.value
Out[15]:
In [16]:
%%time
P = matrix(sec_cov_values)
q = -matrix(signal)
G = np.zeros((2*n, n))
h = np.zeros(2*n)
for i in range(n):
G[i, i] = 1.
h[i] = 1. / n * 20
G[i+n, i] = -1.
h[i+n] = 0.
G = matrix(G)
h = matrix(h)
A = np.ones((1, n))
b = np.ones(1)
A = matrix(A)
b = matrix(b)
sol = solvers.qp(P, q, G, h, A, b)
In [17]:
%%time
lbound = np.zeros(n)
ubound = np.ones(n) * 20 / n
cons_matrix = np.ones((1, n))
clb = np.ones(1)
cub = np.ones(1)
qpopt = QPOptimizer(signal,
None,
lbound,
ubound,
cons_matrix,
clb,
cub,
1.,
risk_cov_values[:n, :n] / 10000.,
risk_exposure_values[:n],
special_risk_values[:n] * special_risk_values[:n] / 10000.)
qpopt.feval()
qpopt.status()
In [18]:
import datetime as dt
In [19]:
def time_function(py_callable, n):
start = dt.datetime.now()
val = py_callable(n)
return (dt.datetime.now() - start).total_seconds(), val
In [20]:
def cvxpy(n):
w = Variable(n)
lbound = 0.
ubound = 0.01
risk = sum_squares(multiply(special_risk_values[:n] / 100., w)) + quad_form((w.T * risk_exposure_values[:n]).T, risk_cov_values[:n, :n] / 10000.)
objective = Minimize(risk_penlty * risk - signal * w)
constraints = [w >= lbound,
w <= ubound,
sum(w) == 1,]
prob = Problem(objective, constraints)
prob.solve(verbose=False, solver='ECOS')
return prob.value
In [21]:
def cvxopt(n):
P = matrix(sec_cov_values)
q = -matrix(signal)
G = np.zeros((2*n, n))
h = np.zeros(2*n)
for i in range(n):
G[i, i] = 1.
h[i] = 0.01
G[i+n, i] = -1.
h[i+n] = 0.
G = matrix(G)
h = matrix(h)
A = np.ones((1, n))
b = np.ones(1)
A = matrix(A)
b = matrix(b)
solvers.options['show_progress'] = False
sol = solvers.qp(P, q, G, h, A, b)
return sol['primal objective']
In [22]:
def ipopt(n):
lbound = np.zeros(n)
ubound = np.ones(n) * 0.01
cons_matrix = np.ones((1, n))
clb = np.ones(1)
cub = np.ones(1)
qpopt = QPOptimizer(signal, None, lbound, ubound, cons_matrix, clb, cub, 1.,
risk_cov_values[:n, :n] / 10000.,
risk_exposure_values[:n],
special_risk_values[:n] * special_risk_values[:n] / 10000.)
return qpopt.feval()
In [23]:
n_steps = list(range(200, 3201, 200))
cvxpy_times = [None] * len(n_steps)
cvxopt_times = [None] * len(n_steps)
ipopt_times = [None] * len(n_steps)
print("{0:<8}{1:>12}{2:>12}{3:>12}".format('Scale(n)', 'cvxpy', 'cvxopt', 'ipopt'))
for i, n in enumerate(n_steps):
sec_cov_values = sec_cov_values_full[:n, :n]
signal = signal_full[:n]
cvxpy_times[i], val1 = time_function(cvxpy, n)
cvxopt_times[i], val2 = time_function(cvxopt, n)
ipopt_times[i], val3 = time_function(ipopt, n)
np.testing.assert_almost_equal(val1, val2, 4)
np.testing.assert_almost_equal(val2, val3, 4)
print("{0:<8}{1:>12.4f}{2:>12.4f}{3:>12.4f}".format(n, cvxpy_times[i], cvxopt_times[i], ipopt_times[i]))
In [ ]: